import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../common/test_page.dart';
import '../utils.dart';

Stream<Object> _getStream() {
  final controller = StreamController<Object>();

  Timer(const Duration(milliseconds: 100), () => controller.add(1));
  Timer(const Duration(milliseconds: 200), () => controller.add('2'));
  Timer(
      const Duration(milliseconds: 300), () => controller.add(const {'3': 3}));
  Timer(const Duration(milliseconds: 400), () {
    controller.add(const {'4': '4'});
  });
  Timer(const Duration(milliseconds: 500), () {
    controller.add(5.0);
    controller.close();
  });

  return controller.stream;
}

class WhereTestPage extends TestPage {
  WhereTestPage(super.title) {
    test('Rx.whereNotNull', () {
      {
        final notNull = Stream.fromIterable([1, 2, 3, 4]).whereNotNull();
        expect(notNull);
        expect(notNull);
      }

      {
        final notNull = Stream.fromIterable([1, 2, null, 3, 4, null])
            .transform(WhereNotNullStreamTransformer());
        expect(notNull);
        expect(notNull);
      }
    });

    test('Rx.whereNotNull.shouldThrow', () {
      expect(
        Stream<bool>.error(Exception()).whereNotNull(),
      );

      expect(
        Rx.concat<int?>([
          Stream.fromIterable([1, 2, null]),
          Stream.error(Exception()),
          Stream.value(3),
        ]).whereNotNull(),
      );
    });

    test('Rx.whereNotNull.asBroadcastStream', () {
      final stream =
      Stream.fromIterable([1, 2, null]).whereNotNull().asBroadcastStream();

      stream.listen(null);
      stream.listen(null);

      expect(true);
    });

    test('Rx.whereNotNull.singleSubscription', () {
      final stream = StreamController<int?>().stream.whereNotNull();

      expect(stream.isBroadcast);
      stream.listen(null);
      expect(() => stream.listen(null));
    });

    test('Rx.whereNotNull.pause.resume', () async {
      final subscription = Stream.fromIterable([null, 2, 3, null, 4, 5, 6])
          .whereNotNull()
          .listen(null);

      subscription
        ..pause()
        ..onData(((data) {
          expect(data);
          subscription.cancel();
        }))
        ..resume();
    });

    test('Rx.whereNotNull.nullable', () {
      nullableTest<String>(
            (s) => s.whereNotNull(),
      );
    });

    test('Rx.whereType', () async {
      _getStream().whereType<Map<String, int>>().listen(((result) {
        expect(result);
      }));
    });

    test('Rx.whereType.polymorphism', () async {
      _getStream().whereType<num>().listen(((Object result) {
        expect(result is num);
      }));
    });

    test('Rx.whereType.null.values', () async {
      expect(Stream.fromIterable([null, 1, null, 'two', 3]).whereType<String>());
    });

    test('Rx.whereType.asBroadcastStream', () async {
      final stream = _getStream().asBroadcastStream().whereType<int>();

      stream.listen(null);
      stream.listen(null);

      expect(true);
    });

    test('Rx.whereType.error.shouldThrow', () async {
      final streamWithError = Stream<void>.error(Exception()).whereType<num>();

      streamWithError.listen(null,
          onError: (Object e, StackTrace s) {
            expect(e);
          });
    });

    test('Rx.whereType.pause.resume', () async {
      late StreamSubscription<int> subscription;
      final stream = Stream.value(1).whereType<int>();

      subscription = stream.listen((value) {
        expect(value);

        subscription.cancel();
      });

      subscription.pause();
      subscription.resume();
    });

    test('Rx.whereType accidental broadcast', () async {
      final controller = StreamController<int>();

      final stream = controller.stream.whereType<int>();

      stream.listen(null);
      expect(() => stream.listen(null));

      controller.add(1);
    });

    test('Rx.whereType.nullable', () {
      nullableTest<String>(
            (s) => s.whereType<String>(),
      );
    });

  }

}